package com.offcn.bigdata.spark.p4

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * 分组排序，按照某个组别，在组别内进行排序，得到结果
  * 对上述的结果进行优化，使用combineByKey去代替groupByKey
  */
object _05GroupedSortOptimizationOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_05GroupedSortOptimizationOps.getClass.getSimpleName}")
            .setMaster("local[*]")
        val sc = new SparkContext(conf)

        val list = sc.parallelize(List(
            "chinese ls 91",
            "english ww 56",
            "chinese zs 90",
            "chinese zl 76",
            "english zq 88",
            "chinese wb 95",
            "chinese sj 74",
            "english ts 87",
            "english ys 67",
            "english mz 77",
            "chinese yj 98",
            "english gk 96"
        ))
        val scores:RDD[Score] = list.map(line => {
            val fields = line.split("\\s+")
            if(fields == null || fields.length != 3) {
                Score(null, null, -1)
            } else {
                Score(fields(0), fields(1), fields(2).toInt)
            }
        }).filter(score => score.name != null)

        //求出每个科目中成绩排名前三的信息
        val course2Score: RDD[(String, Score)] = scores.map(score => (score.course, score))
       course2Score.combineByKey(
            createCombiner,
            mergeValue,
            mergeCombiners
        ).foreach(println)

        sc.stop()
    }

    def createCombiner(score: Score): mutable.TreeSet[Score] = {
        val ts = mutable.TreeSet[Score]()(new Ordering[Score](){
            override def compare(x: Score, y: Score): Int = y.score.compareTo(x.score)
        })
        ts.add(score)
        ts
    }

    def mergeValue(scores: mutable.TreeSet[Score], score: Score): mutable.TreeSet[Score] = {
        scores.add(score)
        if(scores.size > 3) {
            scores.take(3)
        } else {
            scores
        }
    }
    def mergeCombiners(scores1: mutable.TreeSet[Score], scores2: mutable.TreeSet[Score]) = {
        for(score <- scores2) {
            scores1.add(score)
        }
        if(scores1.size > 3) {
            scores1.take(3)
        } else {
            scores1
        }
    }
}
